Kafka实现动态订阅topic 您所在的位置:网站首页 flink source kafka offset Kafka实现动态订阅topic

Kafka实现动态订阅topic

2023-07-04 09:59| 来源: 网络整理| 查看: 265

1、组件和依赖

采用spring-kafka包

org.springframework.kafka spring-kafka 2.5.3.RELEASE

2、配置类

@Bean("aiKafkaListenerFactory") public KafkaListenerContainerFactory kafkaListenerContainerFactory() { Map props = new HashMap(5); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); //轮询时间配置 props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 12000); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer())); return factory; }

 

3、消费类

@KafkaListener(topicPattern = "{topic前缀}.*", containerFactory = "aiKafkaListenerFactory") public void onMessage(ConsumerRecord record) { log.info("kafka消费内容:topic=" + record.topic() + ";content=" + record.value()); try { //处理业务逻辑 } catch (Exception e) { e.printStackTrace(); log.error("kafka消费失败:" + record.value()); } }

 

注意:

1、topicPattern后面为正则表达式,凡是匹配该正则的都可以消费

2、原理为spring定时轮询topic列表,符合条件的重新订阅,轮询时间配置项为ConsumerConfig.METADATA_MAX_AGE_CONFIG



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有